import json

from kafka import KafkaProducer

from util.config_center import ConfigCenter


class KafkaProducerConnect:
    """生产者连接"""

    def __init__(self):
        # 获取配置信息
        self.clusters = ConfigCenter.get_kafka_config()
        self.producer = self.init_producer()

    def init_producer(self):
        """
        初始化
        :return:
        """
        producer = KafkaProducer(bootstrap_servers=self.clusters)
        return producer

    def send_msg(self, topic, msg, key=None):
        """
        发送消息
        :param topic: topic
        :param msg: 消息值
        :param key:
        :return:
        """
        msg = json.dumps(msg).encode("utf-8")
        flag = self.producer.send(topic, msg, key=key)
        return flag
